{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Start the beam-flink job server"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "scrolled": false
   },
   "outputs": [],
   "source": [
    "from hops import beam as hops_beam\n",
    "# Start Beam jobservice\n",
    "hops_beam.start(taskmanager_heap_size=8192)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "colab_type": "text",
    "id": "MV-CWK2kI-iH"
   },
   "source": [
    "## TFMA Notebook example\n",
    "\n",
    "This notebook describes how to export your model for TFMA and demonstrates the analysis tooling it offers.\n",
    "\n",
    "Note: Please make sure to follow the instructions in [README.md](https://github.com/tensorflow/tfx/blob/master/tfx/examples/chicago_taxi/README.md) when running this notebook"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "colab_type": "text",
    "id": "MV-CWK2kI-iH"
   },
   "source": [
    "## Setup\n",
    "\n",
    "Import necessary packages."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "colab": {},
    "colab_type": "code",
    "id": "xFbGgkXAJCJ7"
   },
   "outputs": [],
   "source": [
    "import apache_beam as beam\n",
    "from hops import beam as hops_beam\n",
    "import os\n",
    "from tfx.examples.chicago_taxi import preprocess\n",
    "import shutil\n",
    "import tensorflow as tf\n",
    "import tensorflow_data_validation as tfdv\n",
    "import tensorflow_model_analysis as tfma\n",
    "from google.protobuf import text_format \n",
    "from tensorflow.python.lib.io import file_io\n",
    "from tensorflow_transform.beam.tft_beam_io import transform_fn_io\n",
    "from tensorflow_transform.coders import example_proto_coder\n",
    "from tensorflow_transform.saved import saved_transform_io\n",
    "from tensorflow_transform.tf_metadata import dataset_schema\n",
    "from tensorflow_transform.tf_metadata import schema_utils\n",
    "from tfx.examples.chicago_taxi.trainer import task\n",
    "from tfx.examples.chicago_taxi.trainer import taxi\n",
    "from hops import hdfs as hopsfs\n",
    "from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions, SetupOptions, HadoopFileSystemOptions, PortableOptions, WorkerOptions, DebugOptions"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "colab_type": "text",
    "id": "zpCt_emiJDeb"
   },
   "source": [
    "Helper functions and some constants for running the notebook locally."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "colab": {},
    "colab_type": "code",
    "id": "1Axm8YxCJF7K"
   },
   "outputs": [],
   "source": [
    "BASE_DIR = hopsfs.project_path(exclude_nn_addr=True)\n",
    "DATA_DIR = os.path.join(BASE_DIR, 'Resources/data')\n",
    "OUTPUT_DIR = os.path.join(BASE_DIR, 'Resources/taxi_out')\n",
    "TMP_DIR = os.path.join(BASE_DIR, 'Resources/taxi_tmp')\n",
    "\n",
    "# Base dir containing train and eval data\n",
    "TRAIN_DATA_DIR = os.path.join(DATA_DIR, 'train')\n",
    "EVAL_DATA_DIR = os.path.join(DATA_DIR, 'eval')\n",
    "\n",
    "# Base dir where TFT writes training data\n",
    "TFT_TRAIN_OUTPUT_BASE_DIR = os.path.join(OUTPUT_DIR, 'tft_train')\n",
    "TFT_TRAIN_FILE_PREFIX = 'train_transformed'\n",
    "\n",
    "# Base dir where TFT writes eval data\n",
    "TFT_EVAL_OUTPUT_BASE_DIR = os.path.join(OUTPUT_DIR, 'tft_eval')\n",
    "TFT_EVAL_FILE_PREFIX = 'eval_transformed'\n",
    "\n",
    "TF_OUTPUT_BASE_DIR = os.path.join(OUTPUT_DIR, 'tf')\n",
    "\n",
    "# Base dir where TFMA writes eval data\n",
    "TFMA_OUTPUT_BASE_DIR = os.path.join(OUTPUT_DIR, 'tfma')\n",
    "\n",
    "SERVING_MODEL_DIR = 'serving_model_dir'\n",
    "EVAL_MODEL_DIR = 'eval_model_dir'\n",
    "\n",
    "\n",
    "def get_tft_train_output_dir(run_id):\n",
    "    return _get_output_dir(TFT_TRAIN_OUTPUT_BASE_DIR, run_id)\n",
    "\n",
    "\n",
    "def get_tft_eval_output_dir(run_id):\n",
    "    return _get_output_dir(TFT_EVAL_OUTPUT_BASE_DIR, run_id)\n",
    "\n",
    "\n",
    "def get_tf_output_dir(run_id):\n",
    "    return _get_output_dir(TF_OUTPUT_BASE_DIR, run_id)\n",
    "\n",
    "def get_tfma_output_dir(run_id):\n",
    "    return _get_output_dir(TFMA_OUTPUT_BASE_DIR, run_id)\n",
    "\n",
    "def _get_output_dir(base_dir, run_id):\n",
    "    return os.path.join(base_dir, 'run_' + str(run_id))\n",
    "\n",
    "def get_schema_file():\n",
    "    return os.path.join(OUTPUT_DIR, 'schema.pbtxt')\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Configure the beam pipeline"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "pipeline_args = hops_beam.get_portable_runner_config()\n",
    "options=PipelineOptions(flags=pipeline_args)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Run TFMA to compute metrics\n",
    "For local analysis, TFMA offers a helper method ``tfma.run_model_analysis``"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "def run_tfma(slice_spec, tf_run_id, tfma_run_id, input_csv, schema_file, add_metrics_callbacks=None):\n",
    "    \"\"\"A simple wrapper function that runs tfma locally.\n",
    "    \n",
    "    A function that does extra transformations on the data and then run model analysis.\n",
    "    \n",
    "    Args:\n",
    "        slice_spec: The slicing spec for how to slice the data.\n",
    "        tf_run_id: An id to contruct the model directories with.\n",
    "        tfma_run_id: An id to construct output directories with.\n",
    "        input_csv: The evaluation data in csv format.\n",
    "        schema_file: The file holding a text-serialized schema for the input data.\n",
    "        add_metrics_callback: Optional list of callbacks for computing extra metrics.\n",
    "        \n",
    "    Returns:\n",
    "        An EvalResult that can be used with TFMA visualization functions.\n",
    "    \"\"\"\n",
    "    eval_model_base_dir = os.path.join(get_tf_output_dir(tf_run_id), EVAL_MODEL_DIR)\n",
    "    if eval_model_base_dir.startswith('hdfs'):\n",
    "        eval_model_dir = hopsfs.ls(eval_model_base_dir)[0]\n",
    "    else:\n",
    "        eval_model_dir = os.path.join(eval_model_base_dir, next(os.walk(eval_model_base_dir))[1][0])\n",
    "    eval_shared_model = tfma.default_eval_shared_model(\n",
    "        eval_saved_model_path=eval_model_dir,\n",
    "        add_metrics_callbacks=add_metrics_callbacks)\n",
    "    schema = taxi.read_schema(schema_file)\n",
    "    \n",
    "    print(eval_model_dir)\n",
    "    \n",
    "    display_only_data_location = input_csv\n",
    "    \n",
    "    with beam.Pipeline(options=options) as pipeline:\n",
    "        csv_coder = taxi.make_csv_coder(schema)\n",
    "        raw_data = (\n",
    "            pipeline\n",
    "            | 'ReadFromText' >> beam.io.ReadFromText(\n",
    "                input_csv,\n",
    "                coder=beam.coders.BytesCoder(),\n",
    "                skip_header_lines=True)\n",
    "            | 'ParseCSV' >> beam.Map(csv_coder.decode))\n",
    "        \n",
    "        # Examples must be in clean tf-example format.\n",
    "        coder = taxi.make_proto_coder(schema)\n",
    "        raw_data = (\n",
    "            raw_data\n",
    "            | 'ToSerializedTFExample' >> beam.Map(coder.encode))\n",
    "\n",
    "        _ = (raw_data\n",
    "             | 'ExtractEvaluateAndWriteResults' >>\n",
    "             tfma.ExtractEvaluateAndWriteResults(\n",
    "                 eval_shared_model=eval_shared_model,\n",
    "                 slice_spec=slice_spec,\n",
    "                 output_path=get_tfma_output_dir(tfma_run_id),\n",
    "                 display_only_data_location=input_csv))\n",
    "\n",
    "    return tfma.load_eval_result(output_path=get_tfma_output_dir(tfma_run_id))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### You can also compute metrics on slices of your data in TFMA. Slices can be specified using ``tfma.slicer.SingleSliceSpec``.\n",
    "\n",
    "Below are examples of how slices can be specified."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# An empty slice spec means the overall slice, that is, the whole dataset.\n",
    "OVERALL_SLICE_SPEC = tfma.slicer.SingleSliceSpec()\n",
    "\n",
    "# Data can be sliced along a feature column\n",
    "# In this case, data is sliced along feature column trip_start_hour.\n",
    "FEATURE_COLUMN_SLICE_SPEC = tfma.slicer.SingleSliceSpec(columns=['trip_start_hour'])\n",
    "\n",
    "# Data can be sliced by crossing feature columns\n",
    "# In this case, slices are computed for trip_start_day x trip_start_month.\n",
    "FEATURE_COLUMN_CROSS_SPEC = tfma.slicer.SingleSliceSpec(columns=['trip_start_day', 'trip_start_month'])\n",
    "\n",
    "# Metrics can be computed for a particular feature value.\n",
    "# In this case, metrics is computed for all data where trip_start_hour is 12.\n",
    "FEATURE_VALUE_SPEC = tfma.slicer.SingleSliceSpec(features=[('trip_start_hour', 12)])\n",
    "\n",
    "# It is also possible to mix column cross and feature value cross.\n",
    "# In this case, data where trip_start_hour is 12 will be sliced by trip_start_day.\n",
    "COLUMN_CROSS_VALUE_SPEC = tfma.slicer.SingleSliceSpec(columns=['trip_start_day'], features=[('trip_start_hour', 12)])\n",
    "\n",
    "ALL_SPECS = [\n",
    "    OVERALL_SLICE_SPEC,\n",
    "    FEATURE_COLUMN_SLICE_SPEC, \n",
    "    FEATURE_COLUMN_CROSS_SPEC, \n",
    "    FEATURE_VALUE_SPEC, \n",
    "    COLUMN_CROSS_VALUE_SPEC    \n",
    "]"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Let's run TFMA!"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "tf.logging.set_verbosity(tf.logging.INFO)\n",
    "\n",
    "tfma_result_1 = run_tfma(input_csv=os.path.join(EVAL_DATA_DIR, 'data.csv'), \n",
    "                         tf_run_id=0, \n",
    "                         tfma_run_id=0,\n",
    "                         slice_spec=ALL_SPECS,\n",
    "                         schema_file=get_schema_file())"
   ]
  }
 ],
 "metadata": {
  "colab": {
   "collapsed_sections": [],
   "name": "chicago_taxi_tfma_local_playground.ipynb",
   "provenance": [],
   "version": "0.3.2"
  },
  "kernelspec": {
   "display_name": "PySpark",
   "language": "",
   "name": "pysparkkernel"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "python",
    "version": 2
   },
   "mimetype": "text/x-python",
   "name": "pyspark",
   "pygments_lexer": "python2"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 1
}